Amazon KinesisとAWS WAFを利用して、サーバレスでリアルタイムな侵入防止システムを作ってみた
はじめに
AWSチームのすずきです。
ストリーミングデータの処理サービスのAmazon Kinesisを利用して、Webシステムのアクセスログ解析を実施し、 不正アクセス元と判定されたソースIP情報を、Lambdaを利用して、AWS WAFに反映する事で、 サーバレスでリアリタイムな侵入防止システム(IDS)を実現する機会がありました。
今回、その概要について紹介させて頂きます。
構成概要図
AWS WAF
- ELB は WAFに対応した ALB (Application Load Balancer) を利用します。
- AWS WAF は、指定したIPアドレス(Conditions)のアクセス制限を実現するルール、ACLを設定します。
ACLs
Rule
Conditions
EC2
nginx
- EC2上で稼働するWebサーバとして、Nginxを利用しました
- アクセスログは、パース等の利便性に優れたLTSV形式で出力する設定を行いました
-
「nginx.conf」 設定抜粋
http { log_format ltsv "time:$time_iso8601\t" "remote_addr:$remote_addr\t" "request_method:$request_method\t" "request_length:$request_length\t" "request_uri:$request_uri\t" "uri:$uri\t" "query_string:$query_string\t" "status:$status\t" "bytes_sent:$bytes_sent\t" "body_bytes_sent:$body_bytes_sent\t" "http_referer:$http_referer\t" "http_user_agent:$http_user_agent\t" "http_x_forwarded_for:$http_x_forwarded_for\t" "http_x_forwarded_proto:$http_x_forwarded_proto\t" "request_time:$request_time\t" "upstream_response_time:$upstream_response_time\t" "upstream_cache_status:$upstream_cache_status\t" } server { access_log /var/log/nginx/access_ltsv.log ltsv; }
Fleutnd
-
ログコレクタとしてFluentd(td-agent)、AWSへのログ転送は、Kinesis Plugin(kinesis_firehose)を利用しました。
-
「td-agent.conf」 抜粋
<br />type tail format ltsv tag nginx.access path /var/log/nginx/access_ltsv.log pos_file /var/log/td-agent/buffer/access.log.pos type copy @type kinesis_firehose region us-west-2 delivery_stream_name firehose_name buffer_type file buffer_path /tmp/fluent.*.buffer buffer_chunk_limit 8m buffer_queue_limit 64 retry_wait 30s flush_interval 10s retry_limit 5 flush_at_shutdown true
- 「delivery_stream_name」にFirehoseで作成済みのストリーム名を指定します。 - Firehoseへのアクセス権はEC2、IAMロールで付与しています
- バッファ、ログ送付間隔指定を行う事で、ログ転送のタイムラグを縮小しています。
- 参考:fluent-plugin-kinesisでKinesis Streamsにログを送信する
Kinesis
- 2016年12月時点で、Kinesis Firehose、Analyticsの利用可能なオレゴンリージョン(us-west-2)を利用しました。
Kinesis Firehose
- 受信したデータ、一定時間バッファリングした後、S3へ自動保管するFirehoseを利用しました
- Firehoseのデフォルトの上限 (2,000 トランザクション / 秒、5,000レコード / 秒、5 MB/ 秒)を超える利用が予想される場合、AWSサポートに対しFirehoseの上限緩和申請を実施します。
- より多くのログ流量が見込まれる場合や、他のリアルタイム処理を併用する場合、Kinesis Streamsの利用も検討します。
Kinesis Analytics
Source
- Fluentdの出力先としたFirehoseを指定します
- KinesisyAnalyticsは、Firehoseに到着したデータをほぼリアリタイム(数秒程度の遅延)で参照する事が可能です。
- Kinesisに到着したアクセスログは、FluentdによりLTSV→JSON変換されているため、カラム定義などデフォルトのまま処理が可能です。
Real-time analytics
- サンプルSQLとして提供されている「Aggregate function in a tumbling time window」を元に、30秒間に一定条件を満たした接続元IPアドレスを抽出する指定を行いました。
設定画面
SQL
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (http_x_forwarded_for VARCHAR(64), uri VARCHAR(64), http_user_agent VARCHAR(64), request_count INTEGER, request_total_time INTEGER, row_time_iso9601 varchar(32)); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "http_x_forwarded_for", "uri" ,"http_user_agent" , COUNT(*) AS request_count , SUM("request_time") AS request_total_time , TIMESTAMP_TO_CHAR('yyyy-MM-dd',MIN("SOURCE_SQL_STREAM_001"."ROWTIME")) || 'T' || TIMESTAMP_TO_CHAR('HH:mm:ss', MIN("SOURCE_SQL_STREAM_001"."ROWTIME")) || 'Z' as row_time_iso9601 FROM "SOURCE_SQL_STREAM_001" WHERE "http_user_agent" not like 'Amazon Route 53 Health Check Service%' and "uri" not like '/wp-admin/%' and "status" <> 304 GROUP BY "http_x_forwarded_for", "uri" ,"http_user_agent" , FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 30 TO SECOND) HAVING COUNT(*) > 20 ;
- 30秒間に20回以上、同一URIへのリクエストが行われたソースIPアドレス(http_x_forwarded_for)を抽出しています。
- 標準SQL準拠した記述により、UA、URI、HTTPレスポンスコードなどによるレコードの絞込、除外を実施しています。
- 後段のLambdaでの処理やログ記録用の情報として、ブロック対象とするIPアドレス以外に、URI、UA、リクエスト数、総所要時間、ISO9601書式の時刻情報を出力します。
-
参考リンク: AWS Kinesis Analytics » SQL Reference » Standard SQL Operators
Destination
- 先の「analytics」で抽出されたレコードの出力先を指定します。
- Lambda連携の為、出力先は 事前に作成した Kinesis Streams、出力形式は「JSON」としました。
Kinesis Streams
- Lambda連携用のKinesis Streams、流量は多くない事が見込まれるため、最小構成(シャード:1)で設置します
Lambda
- Kinesis Analytics により、不正アクセス元とされたIPアドレス、WAFのアクセス制限情報(Condtion)として反映します
- Kinesis Streams のイベントトリガとして設定し、Lambdaの環境変数「IPSetId」で指定したIPリストを更新を行います。
コード
import os import base64 import json import boto3 client = boto3.client('waf-regional', region_name='ap-northeast-1') def lambda_handler(event, context): for record in event['Records']: payload = base64.b64decode(record['kinesis']['data']) data = json.loads(payload) x_forwarded_for = str(data["HTTP_X_FORWARDED_FOR"]) blockip = str(x_forwarded_for.split(",")[0]) + "/32" print(blockip) response = client.update_ip_set( IPSetId = os.environ['IPSetId'], ChangeToken = client.get_change_token()['ChangeToken'], Updates=[ { 'Action': 'INSERT', 'IPSetDescriptor': { 'Type': 'IPV4', 'Value': blockip } }, ] )
- 2016年12月時点、LambdaデフォルトのAWS SDK(Boto3)はALB対応のWAF(waf-regional)に非対応であった為、最新のSDKをデプロイパッケージに組み込み利用しました。
- 最新版のSDK(Boto3)をLambda関数で利用してみた
更新結果
- Lambdaのイベントを有効化する事で、WAFで利用されるIPリストが更新されるようになりました。
まとめ
Kinesis(Firehose、Analytcs、Streams)と、AWS WAFをLambdaで連携させる事で、 不正なアクセス発生をニアリアルタイムに解析し、発生から数分でアクセス遮断を実現する仕組みを サーバレスに実現する事ができました。
EC2を利用した不正アクセス対策を導入済みの環境でも、攻撃によりEC2のリソースがボトルネックとなり、 DDoS攻撃が成立してしまう場合がありましたが、AWSのマネジメントサービスのWAFとKinesis(Firehose、Analytics)は、 必要な性能はAWS側で担保された状態で利用する事が期待できます。
また、EC2のみで攻撃に耐えるインフラをオートスケールなどで実現した場合、 初期の構築コストや、増強中のリソース費用が負担となる事がありますが、 AWS WAFにより不正アクセスの一部を食い止める事で、AWS利用の最適化にも繋がると思われます。
今回紹介させて頂いたのは、簡単な条件を用いた不正アクセスのブロックの実現でしたが、 保護ルールの管理や、システムに合わせた最適化に向けたログの可視化などについては、別の機会に紹介させて頂ければと思います。
参考リンク
AWSブログ
How to Configure Rate-Based Blacklisting with AWS WAF and AWS Lambda
- CloudFrontのアクセスログをLambdaで解析し、不正アクセス元IPをAWS WAFでブロックする仕組みが紹介されています。
- 今回の保護対象は、CloudFront(CDN)経由での公開が難しいシステムだった事。また、不正アクセス元の遮断、短いタイムラグで実現するために、AWS側で取得するログではなく、Nginxで取得したログをKinesisで処理する仕組みを導入しました。
Norikra
Norikra+FluentdでDoS攻撃をブロックする仕組みを作ってみた
- 今回紹介させて頂いた内容と処理概要は同一です
- 不正アクセス判定に利用するCEPエンジン、KinesisAnalyticsの替わりにNorikraが利用されています。
- 不正アクセスの遮断、WAFの替わりにVPCのNACLを利用しています。